Skip to content

Latest commit

 

History

History
203 lines (146 loc) · 9.24 KB

3. Running a simle word count demo application.md

File metadata and controls

203 lines (146 loc) · 9.24 KB

#III.运行一个简单的单词计数范例

上一篇例子是通过Spring Data和Spring Boot对Hadoop操作和兼容的官方实践,并能够在Hadoop任务监控页面查看。本篇介绍用Eclipse/STS运行一个简单的单词计数器范例项目,用于将文章内的单词各自统计单词出现的次数。

我们之前已经下载了STS,并且能够运行起来上一篇的范例。本篇会继续配置STS,使之能够访问HDFS来上传下载文件,编写单词统计范例并且运行范例。 ##安装插件 首先,我们需要去下载用于支持Hadoop的MapReduce的Eclipse插件, 由于这个插件只有源码,然而编译却是要动手好一个折腾。所以我找了别人编译好的插件包,放到这个页面上。可以拿去下载使用(解压下载的包然后找到release目录,解压release目录下的jar包即可)。

我们解压的插件包文件名是hadoop-eclipse-kepler-plugin-2.2.0.jar,然后我们要将它放到Eclipse解压目录的plugins目录中去,如果使用的是STS,那么应该是在STS-3...RELEASE/plugins目录中。然后我们可以打开STS/Eclipse了,我这里用的是STS。以STS界面示例。

我们需要在STS中打开Windows->Preferences首选项

首选项设置

找到Hadoop Map/Reduce项,填入Hadoop的解压路径。而后可以应用设置并确认完成设置。

设置MapReduce路径

接着我们需要找到Windows->Show View->来打开我们的MapReduce Location视图,如此一来我们可以通过这个视图连接DFS。

Window菜单

选取视图

接着,我们需要在MapReduce视图中创建一个HDFS连接。连接前请确保已经运行了Hadoop集群(单机,伪分布式均可),我们之前设置的hdfs端口为9000,所以我们设置DFS端口为9000。

DFS

然而Hadoop从第二版开始,使用Yarn作为MapReduce框架,那么就保持默认50020端口连接。

确定之后,我们就可以在项目浏览器里看到DFS项下有子项了。 如果看不到,请切换到MapReduce布局。

DFS

DFS

接下来就可以在项目浏览器里查看DFS连接

DFS

查看HDFS当中的目录和文件

DFS

这样一来,我们的HDFS插件就可以安装好了,可以免除我们每次操作文件都要输入命令的痛苦。

为了演示后面的范例,我在这里创建一个input目录,并且上传一个in.txt。 in.txt 文件内容可以是一篇英文文章。

##运行WordCount单词统计范例。

首先我们创建一个项目,通过File->New->Project来选取MapReduce项目,STS会自动为我们创建一个解析好依赖库的Java项目。

New

下一步 DFS

下一步,写好项目名

DFS

下一步,项目目录结构,一般不改

DFS

完成

DFS

这样一来,我们的项目创建完毕。 接下来我们把范例代码拷贝进来

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.ginryan.gohadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
                new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

放到src目录下自己定义的包下

Mapred

此时我们的范例可以正常运行了。 另外要说一句,Hadoop内置日志log4j库,运行范例时可能会不出现MapReduce任务相关日志,我们需要复制一个log4j的属性配置文件到src目录下,使之显示日志。 日志文件为: log4j.properties

log4j.rootLogger=DEBUG, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender   
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout   
log4j.appender.stdout.layout.ConversionPattern=%c{1} - %m%n  

目录结构为:

Mapred

接下来就需要配置调试参数了。

点击下拉按钮选择Run Configuration,

Mapred

Mapred

我们选Java Application,并点击新建图标,生成新的运行配置,并命名配置

Mapred

我们需要输入范例中运行的主类,参数Arguments需要配置一下。两个参数分别是输入目录路径和输出目录路径。这两个路径会被传入到代码当中处理,所以取决于范例代码中如何处理这些参数。

Mapred

完成后Apply,并Run,我们就可以在控制台中看到这个范例的输出了。

最后我们可以在DFS视图中看到一个新的目录output1,且有两个文件输出,一个是一个名为_SUCCESS的空文件,这只是一个标志,另一个是part-r-00000, 在part-r-00000中,就是我们的结果文件。我们可以在DFS窗口中直接打开查看输出内容。输出内容统计了每个单词出现的次数。